package cn.mhome.spring.thrift.server;

import java.lang.reflect.Constructor;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;

import cn.mhome.easy.spring.thrift.core.exception.SelfUncaughtExceptionHandler;

/**
 * spring BeanFactory 获取thrift service bean
 * 
 * @author zhouwei
 * 
 */
public class RegisterServer implements BeanFactoryAware {

	private static final Log log = LogFactory.getLog(RegisterServer.class);

	private Map<Class<? extends TProcessor>, Class<?>> serviceImplClass;

	private BeanFactory beanFactory;

	protected int port;

	private static SelfUncaughtExceptionHandler su = new SelfUncaughtExceptionHandler();
	
	@Override
	public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
		this.beanFactory = beanFactory;
	}

	public void start() {
		new Thread() {
			public void run() {
				this.setUncaughtExceptionHandler(su);
				startServerInternal();
			}
		}.start();
	}

	protected void startServerInternal() {
		try {
			TProcessor process = getProcessor();
			// 使用高密度二进制协议
			TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
			TNonblockingServerTransport trans = new TNonblockingServerSocket(
					port);
			TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(
					trans);
			args.maxReadBufferBytes = 1638400000;
			args.transportFactory(new TFramedTransport.Factory());
			args.protocolFactory(proFactory);
			args.processor(process);
			args.selectorThreads(24);
			args.workerThreads(64);
			TServer server = new TThreadedSelectorServer(args);
			for (Class<?> clazz : serviceImplClass.values()) {
				log.info("[Server] >>> "
						+ clazz.getSimpleName().replace("Impl", "")
						+ " is starting on port " + port + " protocal = "
						+ proFactory.getClass());
			}
			server.serve();

		} catch (TTransportException e) {
			log.error("Start server error!", e);
		}
	}

	@SuppressWarnings("unchecked")
	protected TMultiplexedProcessor getProcessor() {
		TMultiplexedProcessor tmprocessor = new TMultiplexedProcessor();
		for (Class<?> processorClazz : serviceImplClass.keySet()) {
			Class<?> implClazz = serviceImplClass.get(processorClazz);
			Constructor<TProcessor> constructor = (Constructor<TProcessor>) processorClazz.getConstructors()[0];
			Object serviceImpl = beanFactory.getBean(implClazz);
			TProcessor processor = BeanUtils.instantiateClass(constructor, serviceImpl);
			tmprocessor.registerProcessor(implClazz.getSimpleName().replace("Impl", ""), processor);
		}
		return tmprocessor;
	}

	public int getPort() {
		return port;
	}

	public void setPort(int port) {
		this.port = port;
	}

	public Map<Class<? extends TProcessor>, Class<?>> getServiceImplClass() {
		return serviceImplClass;
	}

	public void setServiceImplClass(Map<Class<? extends TProcessor>, Class<?>> serviceImplClass) {
		this.serviceImplClass = serviceImplClass;
	}
}
